"""Config flow for ZHA."""

from __future__ import annotations

from abc import abstractmethod
import asyncio
import collections
from contextlib import suppress
from enum import StrEnum
import json
import logging
import os
from typing import Any

import voluptuous as vol
from zha.application.const import RadioType
import zigpy.backups
from zigpy.config import CONF_DEVICE, CONF_DEVICE_PATH
from zigpy.exceptions import CannotWriteNetworkSettings, DestructiveWriteNetworkSettings

from homeassistant.components import onboarding, usb
from homeassistant.components.file_upload import process_uploaded_file
from homeassistant.components.hassio import AddonError, AddonState
from homeassistant.components.homeassistant_hardware import silabs_multiprotocol_addon
from homeassistant.components.homeassistant_hardware.firmware_config_flow import (
    ZigbeeFlowStrategy,
)
from homeassistant.components.homeassistant_yellow import hardware as yellow_hardware
from homeassistant.components.usb import USBDevice, scan_serial_ports
from homeassistant.config_entries import (
    SOURCE_IGNORE,
    SOURCE_ZEROCONF,
    ConfigEntry,
    ConfigEntryBaseFlow,
    ConfigEntryState,
    ConfigFlow,
    ConfigFlowResult,
    OperationNotAllowed,
    OptionsFlow,
)
from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant, callback
from homeassistant.data_entry_flow import AbortFlow
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.hassio import is_hassio
from homeassistant.helpers.selector import FileSelector, FileSelectorConfig
from homeassistant.helpers.service_info.usb import UsbServiceInfo
from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
from homeassistant.util import dt as dt_util

from .const import CONF_BAUDRATE, CONF_FLOW_CONTROL, CONF_RADIO_TYPE, DOMAIN
from .helpers import get_config_entry_unique_id, get_zha_gateway
from .radio_manager import (
    DEVICE_SCHEMA,
    HARDWARE_DISCOVERY_SCHEMA,
    RECOMMENDED_RADIOS,
    ProbeResult,
    ZhaRadioManager,
)

_LOGGER = logging.getLogger(__name__)

CONF_MANUAL_PATH = "Enter Manually"
DECONZ_DOMAIN = "deconz"

# The ZHA config flow takes different branches depending on if you are migrating to a
# new adapter via discovery or setting it up from scratch

# For the fast path, we automatically migrate everything and restore the most recent backup
MIGRATION_STRATEGY_RECOMMENDED = "migration_strategy_recommended"
MIGRATION_STRATEGY_ADVANCED = "migration_strategy_advanced"

# Similarly, setup follows the same approach: we create a new network
SETUP_STRATEGY_RECOMMENDED = "setup_strategy_recommended"
SETUP_STRATEGY_ADVANCED = "setup_strategy_advanced"

# For the advanced paths, we allow users to pick how to form a network: form a brand new
# network, use the settings currently on the stick, restore from a database backup, or
# restore from a JSON backup
FORMATION_STRATEGY = "formation_strategy"
FORMATION_FORM_NEW_NETWORK = "form_new_network"
FORMATION_FORM_INITIAL_NETWORK = "form_initial_network"
FORMATION_REUSE_SETTINGS = "reuse_settings"
FORMATION_CHOOSE_AUTOMATIC_BACKUP = "choose_automatic_backup"
FORMATION_UPLOAD_MANUAL_BACKUP = "upload_manual_backup"

CHOOSE_AUTOMATIC_BACKUP = "choose_automatic_backup"
OVERWRITE_COORDINATOR_IEEE = "overwrite_coordinator_ieee"

UPLOADED_BACKUP_FILE = "uploaded_backup_file"

REPAIR_MY_URL = "https://my.home-assistant.io/redirect/repairs/"

LEGACY_ZEROCONF_PORT = 6638
LEGACY_ZEROCONF_ESPHOME_API_PORT = 6053

ZEROCONF_SERVICE_TYPE = "_zigbee-coordinator._tcp.local."
ZEROCONF_PROPERTIES_SCHEMA = vol.Schema(
    {
        vol.Required("radio_type"): vol.All(str, vol.In([t.name for t in RadioType])),
        vol.Required("serial_number"): str,
    },
    extra=vol.ALLOW_EXTRA,
)


class OptionsMigrationIntent(StrEnum):
    """Zigbee options flow intents."""

    MIGRATE = "intent_migrate"
    RECONFIGURE = "intent_reconfigure"


def _format_backup_choice(
    backup: zigpy.backups.NetworkBackup, *, pan_ids: bool = True
) -> str:
    """Format network backup info into a short piece of text."""
    if not pan_ids:
        return dt_util.as_local(backup.backup_time).strftime("%c")

    identifier = (
        # PAN ID
        f"{str(backup.network_info.pan_id)[2:]}"
        # EPID
        f":{str(backup.network_info.extended_pan_id).replace(':', '')}"
    ).lower()

    return f"{dt_util.as_local(backup.backup_time).strftime('%c')} ({identifier})"


async def list_serial_ports(hass: HomeAssistant) -> list[USBDevice]:
    """List all serial ports, including the Yellow radio and the multi-PAN addon."""
    ports: list[USBDevice] = []
    ports.extend(await hass.async_add_executor_job(scan_serial_ports))

    # Add useful info to the Yellow's serial port selection screen
    try:
        yellow_hardware.async_info(hass)
    except HomeAssistantError:
        pass
    else:
        # PySerial does not properly handle the Yellow's serial port with the CM5
        # so we manually include it
        port = USBDevice(
            device="/dev/ttyAMA1",
            vid="ffff",  # This is technically not a USB device
            pid="ffff",
            serial_number=None,
            manufacturer="Nabu Casa",
            description="Yellow Zigbee module",
        )

        ports = [p for p in ports if not p.device.startswith("/dev/ttyAMA")]
        ports.insert(0, port)

    if is_hassio(hass):
        # Present the multi-PAN addon as a setup option, if it's available
        multipan_manager = (
            await silabs_multiprotocol_addon.get_multiprotocol_addon_manager(hass)
        )

        try:
            addon_info = await multipan_manager.async_get_addon_info()
        except (AddonError, KeyError):
            addon_info = None

        if addon_info is not None and addon_info.state != AddonState.NOT_INSTALLED:
            addon_port = USBDevice(
                device=silabs_multiprotocol_addon.get_zigbee_socket(),
                vid="ffff",  # This is technically not a USB device
                pid="ffff",
                serial_number=None,
                manufacturer="Nabu Casa",
                description="Silicon Labs Multiprotocol add-on",
            )

            ports.append(addon_port)

    return ports


class BaseZhaFlow(ConfigEntryBaseFlow):
    """Mixin for common ZHA flow steps and forms."""

    _flow_strategy: ZigbeeFlowStrategy | None = None
    _overwrite_ieee_during_restore: bool = False
    _hass: HomeAssistant
    _title: str

    def __init__(self) -> None:
        """Initialize flow instance."""
        super().__init__()

        self._hass = None  # type: ignore[assignment]
        self._radio_mgr = ZhaRadioManager()
        self._restore_backup_task: asyncio.Task[None] | None = None
        self._reset_old_radio_task: asyncio.Task[None] | None = None
        self._form_network_task: asyncio.Task[None] | None = None

        # Progress flow steps cannot abort so we need to store the abort reason and then
        # re-raise it in a dedicated step
        self._progress_error: AbortFlow | None = None

    @property
    def hass(self) -> HomeAssistant:
        """Return hass."""
        return self._hass

    @hass.setter
    def hass(self, hass: HomeAssistant) -> None:
        """Set hass."""
        self._hass = hass
        self._radio_mgr.hass = hass

    def _get_config_entry_data(self) -> dict[str, Any]:
        """Extract ZHA config entry data from the radio manager."""
        assert self._radio_mgr.radio_type is not None
        assert self._radio_mgr.device_path is not None
        assert self._radio_mgr.device_settings is not None

        return {
            CONF_DEVICE: DEVICE_SCHEMA(
                {
                    **self._radio_mgr.device_settings,
                    CONF_DEVICE_PATH: self._radio_mgr.device_path,
                }
            ),
            CONF_RADIO_TYPE: self._radio_mgr.radio_type.name,
        }

    @abstractmethod
    async def _async_create_radio_entry(self) -> ConfigFlowResult:
        """Create a config entry with the current flow state."""

    async def async_step_progress_failed(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Abort when progress step failed."""
        assert self._progress_error is not None
        raise self._progress_error

    async def async_step_choose_serial_port(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Choose a serial port."""
        ports = await list_serial_ports(self.hass)

        # The full `/dev/serial/by-id/` path is too verbose to show
        resolved_paths = {
            p.device: await self.hass.async_add_executor_job(os.path.realpath, p.device)
            for p in ports
        }

        list_of_ports = [
            f"{resolved_paths[p.device]} - {p.description}{', s/n: ' + p.serial_number if p.serial_number else ''}"
            + (f" - {p.manufacturer}" if p.manufacturer else "")
            for p in ports
        ]

        if not list_of_ports:
            return await self.async_step_manual_pick_radio_type()

        list_of_ports.append(CONF_MANUAL_PATH)

        if user_input is not None:
            user_selection = user_input[CONF_DEVICE_PATH]

            if user_selection == CONF_MANUAL_PATH:
                return await self.async_step_manual_pick_radio_type()

            port = ports[list_of_ports.index(user_selection)]
            self._radio_mgr.device_path = port.device

            probe_result = await self._radio_mgr.detect_radio_type()
            if probe_result == ProbeResult.WRONG_FIRMWARE_INSTALLED:
                return self.async_abort(
                    reason="wrong_firmware_installed",
                    description_placeholders={"repair_url": REPAIR_MY_URL},
                )
            if probe_result == ProbeResult.PROBING_FAILED:
                # Did not autodetect anything, proceed to manual selection
                return await self.async_step_manual_pick_radio_type()

            self._title = (
                f"{port.description}{', s/n: ' + port.serial_number if port.serial_number else ''}"
                f" - {port.manufacturer}"
                if port.manufacturer
                else ""
            )

            return await self.async_step_verify_radio()

        # Pre-select the currently configured port
        default_port: vol.Undefined | str = vol.UNDEFINED

        if self._radio_mgr.device_path is not None:
            for description, port in zip(list_of_ports, ports, strict=False):
                if port.device == self._radio_mgr.device_path:
                    default_port = description
                    break
            else:
                default_port = CONF_MANUAL_PATH

        schema = vol.Schema(
            {
                vol.Required(CONF_DEVICE_PATH, default=default_port): vol.In(
                    list_of_ports
                )
            }
        )
        return self.async_show_form(step_id="choose_serial_port", data_schema=schema)

    async def async_step_manual_pick_radio_type(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Manually select the radio type."""
        if user_input is not None:
            self._radio_mgr.radio_type = RadioType.get_by_description(
                user_input[CONF_RADIO_TYPE]
            )
            return await self.async_step_manual_port_config()

        # Pre-select the current radio type
        default: vol.Undefined | str = vol.UNDEFINED

        if self._radio_mgr.radio_type is not None:
            default = self._radio_mgr.radio_type.description

        schema = {
            vol.Required(CONF_RADIO_TYPE, default=default): vol.In(RadioType.list())
        }

        return self.async_show_form(
            step_id="manual_pick_radio_type",
            data_schema=vol.Schema(schema),
        )

    async def async_step_manual_port_config(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Enter port settings specific for this type of radio."""
        assert self._radio_mgr.radio_type is not None
        errors = {}

        if user_input is not None:
            self._title = user_input[CONF_DEVICE_PATH]
            self._radio_mgr.device_path = user_input[CONF_DEVICE_PATH]
            self._radio_mgr.device_settings = DEVICE_SCHEMA(
                {
                    CONF_DEVICE_PATH: self._radio_mgr.device_path,
                    CONF_BAUDRATE: user_input[CONF_BAUDRATE],
                    # `None` shows up as the empty string in the frontend
                    CONF_FLOW_CONTROL: (
                        user_input[CONF_FLOW_CONTROL]
                        if user_input[CONF_FLOW_CONTROL] != "none"
                        else None
                    ),
                }
            )

            if await self._radio_mgr.radio_type.controller.probe(
                self._radio_mgr.device_settings
            ):
                return await self.async_step_verify_radio()

            errors["base"] = "cannot_connect"

        device_settings = self._radio_mgr.device_settings or {}

        return self.async_show_form(
            step_id="manual_port_config",
            data_schema=vol.Schema(
                {
                    vol.Required(
                        CONF_DEVICE_PATH,
                        default=self._radio_mgr.device_path or vol.UNDEFINED,
                    ): str,
                    vol.Required(
                        CONF_BAUDRATE,
                        default=device_settings.get(CONF_BAUDRATE) or 115200,
                    ): int,
                    vol.Required(
                        CONF_FLOW_CONTROL,
                        default=device_settings.get(CONF_FLOW_CONTROL) or "none",
                    ): vol.In(["hardware", "software", "none"]),
                }
            ),
            errors=errors,
        )

    async def async_step_verify_radio(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Add a warning step to dissuade the use of deprecated radios."""
        assert self._radio_mgr.radio_type is not None
        await self._radio_mgr.async_read_backups_from_database()

        # Skip this step if we are using a recommended radio
        if user_input is not None or self._radio_mgr.radio_type in RECOMMENDED_RADIOS:
            # ZHA disables the single instance check and will decide at runtime if we
            # are migrating or setting up from scratch
            if self.hass.config_entries.async_entries(DOMAIN, include_ignore=False):
                return await self.async_step_choose_migration_strategy()
            return await self.async_step_choose_setup_strategy()

        return self.async_show_form(
            step_id="verify_radio",
            description_placeholders={
                CONF_NAME: self._radio_mgr.radio_type.description,
                "docs_recommended_adapters_url": (
                    "https://www.home-assistant.io/integrations/zha/#recommended-zigbee-radio-adapters-and-modules"
                ),
            },
        )

    async def async_step_choose_setup_strategy(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Choose how to set up the integration from scratch."""
        if self._flow_strategy == ZigbeeFlowStrategy.RECOMMENDED:
            # Fast path: automatically form a new network
            return await self.async_step_setup_strategy_recommended()
        if self._flow_strategy == ZigbeeFlowStrategy.ADVANCED:
            # Advanced path: let the user choose
            return await self.async_step_setup_strategy_advanced()

        # Allow onboarding for new users to just create a new network automatically
        if (
            not onboarding.async_is_onboarded(self.hass)
            and not self.hass.config_entries.async_entries(DOMAIN, include_ignore=False)
            and not self._radio_mgr.backups
        ):
            return await self.async_step_setup_strategy_recommended()

        return self.async_show_menu(
            step_id="choose_setup_strategy",
            menu_options=[
                SETUP_STRATEGY_RECOMMENDED,
                SETUP_STRATEGY_ADVANCED,
            ],
        )

    async def async_step_setup_strategy_recommended(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Recommended setup strategy: form a brand-new network."""
        return await self.async_step_form_new_network()

    async def async_step_setup_strategy_advanced(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Advanced setup strategy: let the user choose."""
        return await self.async_step_choose_formation_strategy()

    async def async_step_choose_migration_strategy(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Choose how to deal with the current radio's settings during migration."""
        if self._flow_strategy == ZigbeeFlowStrategy.RECOMMENDED:
            # Fast path: automatically migrate everything
            return await self.async_step_migration_strategy_recommended()
        if self._flow_strategy == ZigbeeFlowStrategy.ADVANCED:
            # Advanced path: let the user choose
            return await self.async_step_migration_strategy_advanced()
        return self.async_show_menu(
            step_id="choose_migration_strategy",
            menu_options=[
                MIGRATION_STRATEGY_RECOMMENDED,
                MIGRATION_STRATEGY_ADVANCED,
            ],
        )

    async def async_step_migration_strategy_recommended(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Recommended migration strategy: automatically migrate everything."""

        # Assume the most recent backup is the correct one
        self._radio_mgr.chosen_backup = self._radio_mgr.backups[0]
        return await self.async_step_maybe_reset_old_radio()

    async def _async_reset_old_radio(self, config_entry: ConfigEntry) -> None:
        """Do the work of resetting the old radio."""

        # Unload ZHA before connecting to the old adapter
        with suppress(OperationNotAllowed):
            await self.hass.config_entries.async_unload(config_entry.entry_id)

        # Create a radio manager to connect to the old stick to reset it
        temp_radio_mgr = ZhaRadioManager()
        temp_radio_mgr.hass = self.hass
        temp_radio_mgr.device_path = config_entry.data[CONF_DEVICE][CONF_DEVICE_PATH]
        temp_radio_mgr.device_settings = config_entry.data[CONF_DEVICE]
        temp_radio_mgr.radio_type = RadioType[config_entry.data[CONF_RADIO_TYPE]]

        await temp_radio_mgr.async_reset_adapter()

    async def async_step_maybe_reset_old_radio(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Erase the old radio's network settings before migration."""

        # Like in the options flow, pull the correct settings from the config entry
        config_entries = self.hass.config_entries.async_entries(
            DOMAIN, include_ignore=False
        )

        if not config_entries:
            return await self.async_step_restore_backup()

        if self._reset_old_radio_task is None:
            # This will only ever be called during migration, so there must be an
            # existing config entry
            assert len(config_entries) == 1
            config_entry = config_entries[0]

            self._reset_old_radio_task = self.hass.async_create_task(
                self._async_reset_old_radio(config_entry),
                "Reset old radio",
            )

        if not self._reset_old_radio_task.done():
            return self.async_show_progress(
                step_id="maybe_reset_old_radio",
                progress_action="maybe_reset_old_radio",
                progress_task=self._reset_old_radio_task,
            )

        try:
            await self._reset_old_radio_task
        except HomeAssistantError:
            _LOGGER.exception("Failed to reset old radio during migration")
            # Old adapter not found or cannot connect, show prompt to plug back in
            return self.async_show_progress_done(next_step_id="plug_in_old_radio")
        finally:
            self._reset_old_radio_task = None

        return self.async_show_progress_done(next_step_id="restore_backup")

    async def async_step_plug_in_old_radio(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Prompt user to plug in the old radio if connection fails."""
        config_entries = self.hass.config_entries.async_entries(
            DOMAIN, include_ignore=False
        )

        # Unless the user removes the config entry whilst we try to reset the old radio
        # for a few seconds and then also unplugs it, we will basically never hit this
        if not config_entries:
            return await self.async_step_restore_backup()

        config_entry = config_entries[0]
        old_device_path = config_entry.data[CONF_DEVICE][CONF_DEVICE_PATH]

        return self.async_show_menu(
            step_id="plug_in_old_radio",
            menu_options=["retry_old_radio", "skip_reset_old_radio"],
            description_placeholders={"device_path": old_device_path},
        )

    async def async_step_retry_old_radio(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Retry connecting to the old radio."""
        return await self.async_step_maybe_reset_old_radio()

    async def async_step_skip_reset_old_radio(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Skip resetting the old radio and continue with migration."""
        return await self.async_step_restore_backup()

    async def async_step_pre_plug_in_new_radio(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Strip user_input before showing "plug in new radio" form."""
        # This step is necessary to prevent `user_input` from being passed through
        return await self.async_step_plug_in_new_radio()

    async def async_step_plug_in_new_radio(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Prompt user to plug in the new radio if connection fails."""
        if user_input is not None:
            # User confirmed, retry now
            return await self.async_step_restore_backup()

        assert self._radio_mgr.device_path is not None

        return self.async_show_form(
            step_id="plug_in_new_radio",
            description_placeholders={"device_path": self._radio_mgr.device_path},
        )

    async def async_step_migration_strategy_advanced(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Advanced migration strategy: let the user choose."""
        return await self.async_step_choose_formation_strategy()

    async def async_step_choose_formation_strategy(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Choose how to deal with the current radio's settings."""
        await self._radio_mgr.async_load_network_settings()

        strategies = []

        # Check if we have any automatic backups *and* if the backups differ from
        # the current radio settings, if they exist (since restoring would be redundant)
        if self._radio_mgr.backups and (
            self._radio_mgr.current_settings is None
            or any(
                not backup.is_compatible_with(self._radio_mgr.current_settings)
                for backup in self._radio_mgr.backups
            )
        ):
            strategies.append(CHOOSE_AUTOMATIC_BACKUP)

        if self._radio_mgr.current_settings is not None:
            strategies.append(FORMATION_REUSE_SETTINGS)

        strategies.append(FORMATION_UPLOAD_MANUAL_BACKUP)

        # Do not show "erase network settings" if there are none to erase
        if self._radio_mgr.current_settings is None:
            strategies.append(FORMATION_FORM_INITIAL_NETWORK)
        else:
            strategies.append(FORMATION_FORM_NEW_NETWORK)

        # Automatically form a new network if we're onboarding with a brand new radio
        if not onboarding.async_is_onboarded(self.hass) and set(strategies) == {
            FORMATION_UPLOAD_MANUAL_BACKUP,
            FORMATION_FORM_INITIAL_NETWORK,
        }:
            return await self.async_step_form_initial_network()

        # Otherwise, let the user choose
        return self.async_show_menu(
            step_id="choose_formation_strategy",
            menu_options=strategies,
        )

    async def async_step_reuse_settings(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Reuse the existing network settings on the stick."""
        return await self._async_create_radio_entry()

    async def async_step_form_initial_network(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Form an initial network."""
        # This step exists only for translations, it does nothing new
        return await self.async_step_form_new_network(user_input)

    async def _async_form_new_network(self) -> None:
        """Do the work of forming a new network."""
        await self._radio_mgr.async_form_network()
        # Load the newly formed network settings to get the network info
        await self._radio_mgr.async_load_network_settings()

    async def async_step_form_new_network(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Form a brand-new network."""
        if self._form_network_task is None:
            self._form_network_task = self.hass.async_create_task(
                self._async_form_new_network(),
                "Form new network",
            )

        if not self._form_network_task.done():
            return self.async_show_progress(
                step_id="form_new_network",
                progress_action="form_new_network",
                progress_task=self._form_network_task,
            )

        try:
            await self._form_network_task
        except Exception as exc:
            _LOGGER.exception("Failed to form new network")
            self._progress_error = AbortFlow(
                reason="cannot_form_network",
                description_placeholders={"error": str(exc)},
            )
            return self.async_show_progress_done(next_step_id="progress_failed")
        finally:
            self._form_network_task = None

        return self.async_show_progress_done(next_step_id="create_entry")

    def _parse_uploaded_backup(
        self, uploaded_file_id: str
    ) -> zigpy.backups.NetworkBackup:
        """Read and parse an uploaded backup JSON file."""
        with process_uploaded_file(self.hass, uploaded_file_id) as file_path:
            contents = file_path.read_text()

        return zigpy.backups.NetworkBackup.from_dict(json.loads(contents))

    async def async_step_upload_manual_backup(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Upload and restore a coordinator backup JSON file."""
        errors = {}

        if user_input is not None:
            try:
                self._radio_mgr.chosen_backup = await self.hass.async_add_executor_job(
                    self._parse_uploaded_backup, user_input[UPLOADED_BACKUP_FILE]
                )
            except ValueError:
                errors["base"] = "invalid_backup_json"
            else:
                return await self.async_step_maybe_reset_old_radio()

        return self.async_show_form(
            step_id="upload_manual_backup",
            data_schema=vol.Schema(
                {
                    vol.Required(UPLOADED_BACKUP_FILE): FileSelector(
                        FileSelectorConfig(accept=".json,application/json")
                    )
                }
            ),
            errors=errors,
        )

    async def async_step_choose_automatic_backup(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Choose an automatic backup."""
        if self.show_advanced_options:
            # Always show the PAN IDs when in advanced mode
            choices = [
                _format_backup_choice(backup, pan_ids=True)
                for backup in self._radio_mgr.backups
            ]
        else:
            # Only show the PAN IDs for multiple backups taken on the same day
            num_backups_on_date = collections.Counter(
                backup.backup_time.date() for backup in self._radio_mgr.backups
            )
            choices = [
                _format_backup_choice(
                    backup, pan_ids=(num_backups_on_date[backup.backup_time.date()] > 1)
                )
                for backup in self._radio_mgr.backups
            ]

        if user_input is not None:
            index = choices.index(user_input[CHOOSE_AUTOMATIC_BACKUP])
            self._radio_mgr.chosen_backup = self._radio_mgr.backups[index]

            return await self.async_step_maybe_reset_old_radio()

        return self.async_show_form(
            step_id="choose_automatic_backup",
            data_schema=vol.Schema(
                {
                    vol.Required(CHOOSE_AUTOMATIC_BACKUP, default=choices[0]): vol.In(
                        choices
                    ),
                }
            ),
        )

    async def async_step_restore_backup(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Restore network backup to new radio."""
        if self._restore_backup_task is None:
            self._restore_backup_task = self.hass.async_create_task(
                self._radio_mgr.restore_backup(
                    overwrite_ieee=self._overwrite_ieee_during_restore
                ),
                "Restore backup",
            )

        if not self._restore_backup_task.done():
            return self.async_show_progress(
                step_id="restore_backup",
                progress_action="restore_backup",
                progress_task=self._restore_backup_task,
            )

        try:
            await self._restore_backup_task
        except DestructiveWriteNetworkSettings:
            # If we cannot restore without overwriting the IEEE, ask for confirmation
            return self.async_show_progress_done(
                next_step_id="pre_confirm_ezsp_ieee_overwrite"
            )
        except HomeAssistantError:
            _LOGGER.exception("Failed to restore network backup to new radio")
            # User unplugged the new adapter, allow retry
            return self.async_show_progress_done(next_step_id="pre_plug_in_new_radio")
        except CannotWriteNetworkSettings as exc:
            self._progress_error = AbortFlow(
                reason="cannot_restore_backup",
                description_placeholders={"error": str(exc)},
            )
            return self.async_show_progress_done(next_step_id="progress_failed")
        finally:
            self._restore_backup_task = None

        # Otherwise, proceed to entry creation
        return self.async_show_progress_done(next_step_id="create_entry")

    async def async_step_pre_confirm_ezsp_ieee_overwrite(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Strip user_input before showing confirmation form."""
        # This step is necessary to prevent `user_input` from being passed through
        return await self.async_step_confirm_ezsp_ieee_overwrite()

    async def async_step_confirm_ezsp_ieee_overwrite(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Show confirmation form for EZSP IEEE address overwrite."""
        if user_input is None:
            return self.async_show_form(
                step_id="confirm_ezsp_ieee_overwrite",
                data_schema=vol.Schema(
                    {vol.Required(OVERWRITE_COORDINATOR_IEEE, default=True): bool}
                ),
            )

        if not user_input[OVERWRITE_COORDINATOR_IEEE]:
            return self.async_abort(reason="cannot_restore_backup_no_ieee_confirm")

        self._overwrite_ieee_during_restore = True
        return await self.async_step_restore_backup()

    async def async_step_create_entry(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Create the config entry after successful setup/migration."""

        # This step only exists so that we can create entries from other steps
        return await self._async_create_radio_entry()


class ZhaConfigFlowHandler(BaseZhaFlow, ConfigFlow, domain=DOMAIN):
    """Handle a config flow."""

    VERSION = 5

    async def _set_unique_id_and_update_ignored_flow(
        self, unique_id: str, device_path: str
    ) -> None:
        """Set the flow's unique ID and update the device path in an ignored flow."""
        current_entry = await self.async_set_unique_id(unique_id)

        # Only update the current entry if it is an ignored discovery
        if current_entry and current_entry.source == SOURCE_IGNORE:
            self._abort_if_unique_id_configured(
                updates={
                    CONF_DEVICE: {
                        **current_entry.data.get(CONF_DEVICE, {}),
                        CONF_DEVICE_PATH: device_path,
                    },
                }
            )

    @staticmethod
    @callback
    def async_get_options_flow(
        config_entry: ConfigEntry,
    ) -> OptionsFlow:
        """Create the options flow."""
        return ZhaOptionsFlowHandler(config_entry)

    async def async_step_user(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Handle a ZHA config flow start."""
        return await self.async_step_choose_serial_port(user_input)

    async def async_step_confirm(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Confirm a discovery."""

        self._set_confirm_only()

        zha_config_entries = self.hass.config_entries.async_entries(
            DOMAIN, include_ignore=False
        )

        if self._radio_mgr.device_path is not None:
            # Ensure the radio manager device path is unique and will match ZHA's
            try:
                self._radio_mgr.device_path = await self.hass.async_add_executor_job(
                    usb.get_serial_by_id, self._radio_mgr.device_path
                )
            except OSError as error:
                raise AbortFlow(
                    reason="cannot_resolve_path",
                    description_placeholders={"path": self._radio_mgr.device_path},
                ) from error

            # mDNS discovery can advertise the same adapter on multiple IPs or via a
            # hostname, which should be considered a duplicate
            current_device_paths = {self._radio_mgr.device_path}

            if self.source == SOURCE_ZEROCONF:
                discovery_info = self.init_data
                current_device_paths |= {
                    f"socket://{ip}:{discovery_info.port}"
                    for ip in discovery_info.ip_addresses
                }

            for entry in zha_config_entries:
                path = entry.data.get(CONF_DEVICE, {}).get(CONF_DEVICE_PATH)

                # Abort discovery if the device path is already configured
                if path is not None and path in current_device_paths:
                    return self.async_abort(reason="single_instance_allowed")

        # Without confirmation, discovery can automatically progress into parts of the
        # config flow logic that interacts with hardware.
        # Ignore Zeroconf discoveries during onboarding, as they may be in use already.
        if user_input is not None or (
            not onboarding.async_is_onboarded(self.hass)
            and not zha_config_entries
            and self.source != SOURCE_ZEROCONF
        ):
            # Probe the radio type if we don't have one yet
            if self._radio_mgr.radio_type is None:
                probe_result = await self._radio_mgr.detect_radio_type()
            else:
                probe_result = ProbeResult.RADIO_TYPE_DETECTED

            if probe_result == ProbeResult.WRONG_FIRMWARE_INSTALLED:
                return self.async_abort(
                    reason="wrong_firmware_installed",
                    description_placeholders={"repair_url": REPAIR_MY_URL},
                )
            if probe_result == ProbeResult.PROBING_FAILED:
                # This path probably will not happen now that we have
                # more precise USB matching unless there is a problem
                # with the device
                return self.async_abort(reason="usb_probe_failed")

            if self._radio_mgr.device_settings is None:
                return await self.async_step_manual_port_config()

            return await self.async_step_verify_radio()

        return self.async_show_form(
            step_id="confirm",
            description_placeholders={CONF_NAME: self._title},
        )

    async def async_step_usb(self, discovery_info: UsbServiceInfo) -> ConfigFlowResult:
        """Handle usb discovery."""
        vid = discovery_info.vid
        pid = discovery_info.pid
        serial_number = discovery_info.serial_number
        manufacturer = discovery_info.manufacturer
        description = discovery_info.description
        dev_path = discovery_info.device

        await self._set_unique_id_and_update_ignored_flow(
            unique_id=f"{vid}:{pid}_{serial_number}_{manufacturer}_{description}",
            device_path=dev_path,
        )

        # If they already have a discovery for deconz we ignore the usb discovery as
        # they probably want to use it there instead
        if self.hass.config_entries.flow.async_progress_by_handler(DECONZ_DOMAIN):
            return self.async_abort(reason="not_zha_device")
        for entry in self.hass.config_entries.async_entries(DECONZ_DOMAIN):
            if entry.source != SOURCE_IGNORE:
                return self.async_abort(reason="not_zha_device")

        self._radio_mgr.device_path = dev_path
        self._title = description or usb.human_readable_device_name(
            dev_path,
            serial_number,
            manufacturer,
            description,
            vid,
            pid,
        )
        self.context["title_placeholders"] = {CONF_NAME: self._title}
        return await self.async_step_confirm()

    async def async_step_zeroconf(
        self, discovery_info: ZeroconfServiceInfo
    ) -> ConfigFlowResult:
        """Handle zeroconf discovery."""

        # Transform legacy zeroconf discovery into the new format
        if discovery_info.type != ZEROCONF_SERVICE_TYPE:
            port = discovery_info.port or LEGACY_ZEROCONF_PORT
            name = discovery_info.name

            # Fix incorrect port for older TubesZB devices
            if "tube" in name and port == LEGACY_ZEROCONF_ESPHOME_API_PORT:
                port = LEGACY_ZEROCONF_PORT

            # Determine the radio type
            if "radio_type" in discovery_info.properties:
                radio_type = discovery_info.properties["radio_type"]
            elif "efr32" in name:
                radio_type = RadioType.ezsp.name
            elif "zigate" in name:
                radio_type = RadioType.zigate.name
            else:
                radio_type = RadioType.znp.name

            fallback_title = name.split("._", 1)[0]
            title = discovery_info.properties.get("name", fallback_title)

            discovery_info = ZeroconfServiceInfo(
                ip_address=discovery_info.ip_address,
                ip_addresses=discovery_info.ip_addresses,
                port=port,
                hostname=discovery_info.hostname,
                type=ZEROCONF_SERVICE_TYPE,
                name=f"{title}.{ZEROCONF_SERVICE_TYPE}",
                properties={
                    "radio_type": radio_type,
                    # To maintain backwards compatibility
                    "serial_number": discovery_info.hostname.removesuffix(".local."),
                },
            )

        try:
            discovery_props = ZEROCONF_PROPERTIES_SCHEMA(discovery_info.properties)
        except vol.Invalid:
            return self.async_abort(reason="invalid_zeroconf_data")

        radio_type = self._radio_mgr.parse_radio_type(discovery_props["radio_type"])
        device_path = f"socket://{discovery_info.host}:{discovery_info.port}"
        title = discovery_info.name.removesuffix(f".{ZEROCONF_SERVICE_TYPE}")

        await self._set_unique_id_and_update_ignored_flow(
            unique_id=discovery_props["serial_number"],
            device_path=device_path,
        )

        self.context["title_placeholders"] = {CONF_NAME: title}
        self._title = title
        self._radio_mgr.device_path = device_path
        self._radio_mgr.radio_type = radio_type
        self._radio_mgr.device_settings = DEVICE_SCHEMA(
            {
                CONF_DEVICE_PATH: device_path,
                CONF_BAUDRATE: 115200,
                CONF_FLOW_CONTROL: None,
            }
        )

        return await self.async_step_confirm()

    async def async_step_hardware(
        self, data: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Handle hardware flow."""
        try:
            discovery_data = HARDWARE_DISCOVERY_SCHEMA(data)
        except vol.Invalid:
            return self.async_abort(reason="invalid_hardware_data")

        name = discovery_data["name"]
        radio_type = self._radio_mgr.parse_radio_type(discovery_data["radio_type"])
        device_settings = discovery_data["port"]
        device_path = device_settings[CONF_DEVICE_PATH]
        self._flow_strategy = discovery_data.get("flow_strategy")

        await self._set_unique_id_and_update_ignored_flow(
            unique_id=f"{name}_{radio_type.name}_{device_path}",
            device_path=device_path,
        )

        self._title = name
        self._radio_mgr.radio_type = radio_type
        self._radio_mgr.device_path = device_path
        self._radio_mgr.device_settings = device_settings
        self.context["title_placeholders"] = {CONF_NAME: name}

        return await self.async_step_confirm()

    async def _async_create_radio_entry(self) -> ConfigFlowResult:
        """Create a config entry with the current flow state."""

        # ZHA is still single instance only, even though we use discovery to allow for
        # migrating to a new radio
        zha_config_entries = self.hass.config_entries.async_entries(
            DOMAIN, include_ignore=False
        )
        data = self._get_config_entry_data()

        if len(zha_config_entries) == 1:
            return self.async_update_reload_and_abort(
                entry=zha_config_entries[0],
                title=self._title,
                data=data,
                reload_even_if_entry_is_unchanged=True,
                reason="reconfigure_successful",
            )
        if not zha_config_entries:
            # Load network settings from the radio to get the EPID
            await self._radio_mgr.async_load_network_settings()
            assert self._radio_mgr.current_settings is not None

            unique_id = get_config_entry_unique_id(
                self._radio_mgr.current_settings.network_info
            )
            await self.async_set_unique_id(unique_id)

            return self.async_create_entry(
                title=self._title,
                data=data,
            )
        # This should never be reached
        return self.async_abort(reason="single_instance_allowed")


class ZhaOptionsFlowHandler(BaseZhaFlow, OptionsFlow):
    """Handle an options flow."""

    _migration_intent: OptionsMigrationIntent

    def __init__(self, config_entry: ConfigEntry) -> None:
        """Initialize options flow."""
        super().__init__()
        self._radio_mgr.device_path = config_entry.data[CONF_DEVICE][CONF_DEVICE_PATH]
        self._radio_mgr.device_settings = config_entry.data[CONF_DEVICE]
        self._radio_mgr.radio_type = RadioType[config_entry.data[CONF_RADIO_TYPE]]
        self._title = config_entry.title

    async def async_step_init(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Launch the options flow."""
        if user_input is not None:
            # Perform a backup first
            try:
                zha_gateway = get_zha_gateway(self.hass)
            except ValueError:
                pass
            else:
                # The backup itself will be stored in `zigbee.db`, which the radio
                # manager will read when the class is initialized
                application_controller = zha_gateway.application_controller
                await application_controller.backups.create_backup(load_devices=True)

            # Then unload the integration
            with suppress(OperationNotAllowed):
                # OperationNotAllowed: ZHA is not running
                await self.hass.config_entries.async_unload(self.config_entry.entry_id)

            return await self.async_step_prompt_migrate_or_reconfigure()

        return self.async_show_form(step_id="init")

    async def async_step_prompt_migrate_or_reconfigure(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Confirm if we are migrating adapters or just re-configuring."""

        return self.async_show_menu(
            step_id="prompt_migrate_or_reconfigure",
            menu_options=[
                OptionsMigrationIntent.RECONFIGURE,
                OptionsMigrationIntent.MIGRATE,
            ],
        )

    async def async_step_intent_reconfigure(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Virtual step for when the user is reconfiguring the integration."""
        self._migration_intent = OptionsMigrationIntent.RECONFIGURE
        return await self.async_step_choose_serial_port()

    async def async_step_intent_migrate(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Confirm the user wants to reset their current radio."""
        self._migration_intent = OptionsMigrationIntent.MIGRATE
        return await self.async_step_choose_serial_port()

    async def async_step_maybe_reset_old_radio(
        self, user_input: dict[str, Any] | None = None
    ) -> ConfigFlowResult:
        """Erase the old radio's network settings before migration."""

        # If we are reconfiguring, the old radio will not be available
        if self._migration_intent is OptionsMigrationIntent.RECONFIGURE:
            return await self.async_step_restore_backup()

        return await super().async_step_maybe_reset_old_radio(user_input)

    async def _async_create_radio_entry(self):
        """Re-implementation of the base flow's final step to update the config."""

        # Avoid creating both `.options` and `.data` by directly writing `data` here
        self.hass.config_entries.async_update_entry(
            entry=self.config_entry,
            data=self._get_config_entry_data(),
            options=self.config_entry.options,
        )

        # Reload ZHA after we finish
        await self.hass.config_entries.async_setup(self.config_entry.entry_id)

        return self.async_abort(reason="reconfigure_successful")

    def async_remove(self):
        """Maybe reload ZHA if the flow is aborted."""
        if self.config_entry.state not in (
            ConfigEntryState.SETUP_ERROR,
            ConfigEntryState.NOT_LOADED,
        ):
            return

        self.hass.async_create_task(
            self.hass.config_entries.async_setup(self.config_entry.entry_id)
        )
